package base_spark


import base_spark.util.Utils
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

class ObjAvg {
    val conf: SparkConf = new SparkConf()
    conf.setMaster("local[*]")
        .setAppName("计算平均值")
    val sc = new SparkContext(conf)

    def objectAvg(): Float = {
        val rdd = sc.parallelize(1.to(20))
            .aggregate((0, 0))(
                (total, item) => (total._1 + item, total._2 + 1),
                (total1, total2) => (total1._1 + total2._1, total1._2 + total2._2)
            )
        println(rdd)
        println(rdd._1 / rdd._2.toFloat)
        return rdd._1 / rdd._2.toFloat
    }

    // 单个文件目录求平均值
    def singleFileAvg(): Unit = {
        var path = Utils.getResourcePath("/avg/1.txt")
        var res = sc.textFile(path)
            .map(_.toFloat)
            .aggregate((0.0, 0.0))(
                (acc, number) => (acc._1 + number, acc._2 + 1),
                (acc1, acc2) => (acc1._1 + acc2._1, acc2._2 + acc2._2)
            )

        println(res)
        println(res._1 / res._2)
    }

    // 目录下的文件求平均值
    def wholeFilesAvg(): Unit = {
        var path = Utils.getResourcePath("/avg")
        var res = sc.wholeTextFiles(path)
        //        res.cache()
        //        res.map(item => item._2.map(println)).collect()
        //        println("--------")
        res.map(item => item._2.map(line => {
            (item._1.toString, line)
        }))
            .map(item => {
                var list: List[Float] = List()
                var key: String = ""
                print(item)
                for (value <- item) {
                    key = value._1
                    if (value._2.isDigit == true && value._2 != "") {
                        println(key, value._2, value._2.toString, value._2.toString.toFloat)
                        list = List.concat(list, List(value._2.toString.toFloat))
                    }

                }
                println(key, list)
                (key, list.sum / list.length)
            })
            .foreach(println)
    }

    // 求交集和并集
    def calSet(): Unit = {
        val rdd1: RDD[(Int, String)] = sc.parallelize(List((1, "pandas"), (2, "java")))
        val rdd2: RDD[(Int, String)] = sc.parallelize(List((1, "pandas"), (1, "hello world")))
        println("交集", rdd1.intersection(rdd2).collect().toList)
        println("合并", rdd1.union(rdd2).collect().toList)
        println("差集", rdd1.subtract(rdd2).collect().toList)
        println("join", rdd1.join(rdd2).collect().toList)


    }

    //求pair的平均
    def pairAvg(): Unit = {
        var rdd = sc.parallelize(List(("hadoop", 6), ("hadoop", 4), ("spark", 2), ("spark", 6)))
        rdd.cache()
        rdd.mapValues((_, 1))
            .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
            .mapValues(x => x._1 / x._2.toFloat)
            .foreach(println)

        rdd.combineByKey(
            (v) => (v, 1),
            (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), // 本地合并
            (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
        ).mapValues(x => x._1 / x._2.toFloat).foreach(println)
    }


    def test1(): Unit = {
        println(sc.textFile(Utils.getResourcePath("/avg/1.txt"))
            .map(line => ("ss", line))
            .groupByKey()
            .mapValues(item => item.map(_.trim.toInt))
            .mapValues(item => item.sum / item.size.toFloat)
            .collectAsMap()
        )

    }

    def test2(): Unit = {
        println(
            sc.textFile(Utils.getResourcePath("/avg/1.txt"))
                .map(line => (line + " "))
                .flatMap(line => line.split(" "))
                .filter(_ != "")
                .map(_.trim.toInt)
                .map(("tempkey",_))
                .groupByKey()
                .mapValues(item=>item.sum / item.size.toFloat)
                .collectAsMap()
        )


    }
}

object Avg {
    def main(args: Array[String]): Unit = {
        val avg = new ObjAvg
        //        avg.objectAvg()
        //        avg.singleFileAvg()
        //        avg.wholeFilesAvg()
        avg.test2()
        avg.test1()
        //        avg.calSet()
        //        avg.pairAvg()
    }

}
